Source code for qtealeaves.observables.correlation

# This code is part of qtealeaves.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
The correlation observable measures the correlation between
two operators. It cannot be used for fermionic operators which
require a Jordan-Wigner transformation. Thus, the correlation
is of type ``<A_i B_j>``. The index ``i`` is running over
the rows of a matrix, the index ``jj`` over the columns.

( <A_1 B_1>   <A_1 B_2>   <A_1 B_3>   ... )
( <A_2 B_1>   <A_2 B_2>   <A_2 B_3>   ... )
( <A_3 B_1>   <A_3 B_2>   <A_3 B_3>   ... )
(    ...         ...         ...      ... )
"""

# pylint: disable=too-many-locals

import numpy as np

from qtealeaves.mpos import ITPO, DenseMPO, DenseMPOList, MPOSite
from qtealeaves.tooling import QTeaLeavesError

from .tnobase import _TNObsBase

__all__ = ["TNObsCorr", "TNObsCorr4"]


[docs] class TNObsCorr(_TNObsBase): """ The correlation observable measures the correlation between two operators. It cannot be used for fermionic operators which require a Jordan-Wigner transformation. Thus, the correlation is of type ``<A_i B_j>``. The index ``i`` is running over the rows of a matrix, the index ``jj`` over the columns. .. code-block:: ( <A_1 B_1> <A_1 B_2> <A_1 B_3> ... ) ( <A_2 B_1> <A_2 B_2> <A_2 B_3> ... ) ( <A_3 B_1> <A_3 B_2> <A_3 B_3> ... ) ( ... ... ... ... ) **Arguments** name : str Define a label under which we can find the observable in the result dictionary. operators : list of two strings Identifiers/strings for the operators to be measured. batch_size : int | None, optional None measures with a single iTPO. Any integer will be interpreted as batch size after every batch_size entries a new iTPO is created. The batch size addresses the problem of memory needs of the iTPO correlation measurement with all terms at once; lower batch sizes reduce the memory cost and increasing the computational cost to some extent. Default to None (Measure all correlations in one iTPO). """ def __init__(self, name, operators, batch_size=None): super().__init__(name) self.operators = [operators] self.batch_size = [batch_size] self.measurable_ansaetze = ("MPS", "TTN", "TTO", "ATTN", "STATE")
[docs] @classmethod def empty(cls): """ Documentation see :func:`_TNObsBase.empty`. """ obj = cls(None, None) obj.name = [] obj.operators = [] obj.batch_size = [] return obj
def __len__(self): """ Provide appropriate length method. """ return len(self.name) def __iadd__(self, other): """ Documentation see :func:`_TNObsBase.__iadd__`. """ if isinstance(other, TNObsCorr): self.name += other.name self.operators += other.operators self.batch_size += other.batch_size else: raise QTeaLeavesError("__iadd__ not defined for this type.") return self
[docs] def collect_operators(self): """ Documentation see :func:`_TNObsBase.collect_operators`. """ for elem in self.operators: yield (elem[0], "l") yield (elem[1], "r")
[docs] def to_itpo(self, operators, tensor_backend, num_sites, de_layer=None): """ Return an ITPO or ITPOs as iterator representing alltogether the correlation observables. In the case of the aTTN the function takes care of diagonal terms of corr matrix with the disentanglers on them. These diagonal terms are stored in the order of looping over ii and jj. The number of ITPOs can be tuned indirectly via the batch size, i.e., after each batch_size terms an ITPO is constructed and yielded plus one with the potentially remaining terms. Parameters ---------- operators: TNOperators The operator class tensor_backend: instance of `TensorBackend` Tensor backend of the simulation num_sites: int Number of sites of the state de_layer : DELayer or `None`, optional Disentangler layer for which the iTPO layer is created Default to `None` (standard TN with no disentanglers) Returns ------- ITPO The ITPO class Details ------- ITPOs can be costly in memory and we are able to tune their memory footprint. The batch size purely counts terms and can for example contain contributions for two correlation measurements. An example how to parse the output can be found in the `tn_simulations.py`. """ if de_layer is None: yield from self._to_itpo(operators, tensor_backend, num_sites) else: yield from self._to_itpo_de_layer( operators, tensor_backend, num_sites, de_layer )
def _to_itpo(self, operators, tensor_backend, num_sites): """ Return an ITPO or ITPOs as iterator representing alltogether the correlation observables. The number of ITPOs can be tuned indirectly via the batch size, i.e., after each batch_size terms an ITPO is constructed and yielded plus one with the potentially remaining terms. Parameters ---------- operators: TNOperators The operator class tensor_backend: instance of `TensorBackend` Tensor backend of the simulation num_sites: int Number of sites of the state Returns ------- ITPO The ITPO class """ dense_mpo_list = DenseMPOList() for kk, _ in enumerate(self.name): key_a = self.operators[kk][0] key_b = self.operators[kk][1] for ii in range(num_sites): for jj in range(num_sites): if ii == jj: continue site_a = MPOSite( ii, key_a, 1.0, 1.0, operators=operators, params={} ) site_b = MPOSite( jj, key_b, 1.0, 1.0, operators=operators, params={} ) dense_mpo = DenseMPO( [site_a, site_b], tensor_backend=tensor_backend ) dense_mpo_list.append(dense_mpo) if len(dense_mpo_list) == self.batch_size[kk]: dense_mpo_list = dense_mpo_list.sort_sites() itpo = ITPO(num_sites) itpo.add_dense_mpo_list(dense_mpo_list) yield itpo # Start with new empty dense_mpo_list = DenseMPOList() if len(dense_mpo_list) == 0: # We finished exactly on the last term return # Sites are not ordered and we have to make links match anyways dense_mpo_list = dense_mpo_list.sort_sites() itpo = ITPO(num_sites) itpo.add_dense_mpo_list(dense_mpo_list) yield itpo def _to_itpo_de_layer(self, operators, tensor_backend, num_sites, de_layer): """ Return an ITPO or ITPOs as iterator representing alltogether the correlation observables. In the case of the aTTN the function takes care of diagonal terms of corr matrix with the disentanglers on them. These diagonal terms are stored in the order of looping over ii and jj. The number of ITPOs can be tuned indirectly via the batch size, i.e., after each batch_size terms an ITPO is constructed and yielded plus one with the potentially remaining terms. Parameters ---------- operators: TNOperators The operator class tensor_backend: instance of `TensorBackend` Tensor backend of the simulation num_sites: int Number of sites of the state de_layer : DELayer Disentangler layer for which the iTPO layer is created Returns ------- ITPO The ITPO class """ # only if aTTN dense_mpo_list = DenseMPOList() for kk, _ in enumerate(self.name): key_a = self.operators[kk][0] key_b = self.operators[kk][1] # get all operators.ops[] key_ab = None key_id = None for oset_name in operators.set_names: op_a = operators[(oset_name, key_a)] op_b = operators[(oset_name, key_b)] # get the operator a x b for the diag entries if op_a.ndim == 2: op_ab = op_a @ op_b else: # Assume rank-4 (but delta charge both times to the right # Assume that tensor_a and tensor_b have same link dimensions op_ab = op_a.tensordot(op_b, ([2], [1])) op_ab = op_ab.transpose([3, 0, 1, 4, 2, 5]) op_ab.fuse_links_update(4, 5) op_ab.fuse_links_update(0, 1, False) key_ab = str(id(op_ab)) if key_ab is None else key_ab operators[(oset_name, key_ab)] = op_ab # get the matching identity (is it not always in operators?) op_identity = op_a.eye_like(op_a.links[1]) op_identity.attach_dummy_link(0, is_outgoing=False) op_identity.attach_dummy_link(3, is_outgoing=True) key_id = str(id(op_identity)) if key_id is None else key_id operators[(oset_name, key_id)] = op_identity # fill the itpo with correct operators on correct sites for ii in range(num_sites): for jj in range(num_sites): if ii == jj and (ii in de_layer.de_sites): # Diagonal entries of correlation matrix for sites that have # a disentangler attached to it cannot longer be measured as # local terms, and thus the appropriate identity operator needs # to be added on the other site of the disentangler # (Warning: if we add them here, we should more or less have control # over the TPO-id, if they are added by the contract-DE-layer logic, # the TPO-id will be added at the end messing up our loop) where = np.where(de_layer.de_sites == ii) # we assume max 1 disentangler per site ind = [where[0][0], where[1][0]] site_identity = de_layer.de_sites[ind[0], abs(ind[1] - 1)] site_a = MPOSite( ii, key_ab, 1.0, 1.0, operators=operators, params={} ) site_b = MPOSite( site_identity, key_id, 1.0, 1.0, operators=operators, params={}, ) elif ii == jj: continue else: site_a = MPOSite( ii, key_a, 1.0, 1.0, operators=operators, params={} ) site_b = MPOSite( jj, key_b, 1.0, 1.0, operators=operators, params={} ) dense_mpo = DenseMPO( [site_a, site_b], tensor_backend=tensor_backend ) dense_mpo_list.append(dense_mpo) if len(dense_mpo_list) == self.batch_size[kk]: dense_mpo_list = dense_mpo_list.sort_sites() itpo = ITPO(num_sites) itpo.add_dense_mpo_list(dense_mpo_list) yield itpo # Start with new empty dense_mpo_list = DenseMPOList() if len(dense_mpo_list) == 0: # We finished exactly on the last term return # Sites within dense mpos are not ordered and we have to make links match anyways dense_mpo_list = dense_mpo_list.sort_sites() itpo = ITPO(num_sites) itpo.add_dense_mpo_list(dense_mpo_list) itpo.set_meas_status(do_measurement=True) yield itpo
[docs] def read(self, fh, **kwargs): """ Read the measurements of the correlation observable from fortran. **Arguments** fh : filehandle Read the information about the measurements from this filehandle. """ # First line is separator _ = fh.readline() is_meas = fh.readline().replace("\n", "").replace(" ", "") is_measured = is_meas == "T" for elem in self.name: if is_measured: realcompl = fh.readline() if "C" in realcompl: str_values_r = fh.readline() str_values_c = fh.readline() real_val = np.array(list(map(float, str_values_r.split()))) compl_val = np.array(list(map(float, str_values_c.split()))) vect = real_val + 1j * compl_val numlines = real_val.shape[0] # Build numpy matrix res = np.zeros((numlines, numlines), dtype=vect.dtype) res[:, 0] = vect # Now we now the number of entries after reading the # first line and (numlines - 1) lines are left for ii in range(1, numlines): str_values_r = fh.readline() str_values_c = fh.readline() real_val = np.array(list(map(float, str_values_r.split()))) compl_val = np.array(list(map(float, str_values_c.split()))) res[:, ii] = real_val + 1j * compl_val yield elem, res if "R" in realcompl: str_values_r = fh.readline() real_val = np.array(list(map(float, str_values_r.split()))) numlines = real_val.shape[0] # Build numpy matrix res = np.zeros((numlines, numlines), dtype=real_val.dtype) res[:, 0] = real_val for ii in range(1, numlines): str_values_r = fh.readline() real_val = np.array(list(map(float, str_values_r.split()))) res[:, ii] = real_val yield elem, res else: yield elem, None
[docs] def read_hdf5(self, dataset, params): """ The data is written into groups, each containing the dataset real, and complex variables also containing a dataset imag. Check if imag exists and sum them up. The names of the groups are (0-based) integers which correspond to the names of the observables. **Arguments** dataset : h5py dataset The dataset to be read out of. params : parameter dictionary Unused for now (Dec 2023) """ _ = params # unused for now for ii, name in enumerate(self.name): real = dataset[str(ii) + "/real"][()] if "imag" in dataset[str(ii)]: imag = dataset[str(ii) + "/imag"][()] yield name, real + 1j * imag else: yield ii, real
[docs] def write_results(self, fh, state_ansatz, **kwargs): """ See :func:`_TNObsBase.write_results`. """ is_measured = self.check_measurable(state_ansatz) # Write separator first fh.write("-" * 20 + "tnobscorr\n") # Assignment for the linter _ = fh.write("T \n") if is_measured else fh.write("F \n") if is_measured: for name_ii in self.name: # Write column by column because this is favored # by the fortran memory corr_mat = self.results_buffer[name_ii] real_mat = np.real(corr_mat) imag_mat = np.imag(corr_mat) if np.any(np.abs(imag_mat) > 1e-12): fh.write("C\n") for jj in range(real_mat.shape[1]): str_r = " ".join(list(map(str, list(real_mat[:, jj])))) str_c = " ".join(list(map(str, list(imag_mat[:, jj])))) fh.write(str_r + "\n") fh.write(str_c + "\n") else: fh.write("R\n") for jj in range(real_mat.shape[1]): str_r = " ".join(list(map(str, list(real_mat[:, jj])))) fh.write(str_r + "\n") self.results_buffer = {}
[docs] class TNObsCorr4(TNObsCorr): """ Measure the correlation of four operators. This measurement does not do any projections. We skip entries where two or more of the four indices match, e.g., `i1 = i2`. **Arguments** name : str Define a label under which we can find the observable in the result dictionary. operator : list of four strings Identifiers / strings for the operators to be measured. **Details** Restrictions apply in the case of symmetric tensor, where the measurement might not be available even if it is available without symmetries. """ measurable_ansaetze = ()
[docs] @classmethod def empty(cls): """ Documentation see :func:`_TNObsBase.empty`. """ obj = cls(None, None) obj.name = [] obj.operators = [] return obj
[docs] def collect_operators(self): """ Documentation see :func:`_TNObsBase.collect_operators`. """ for elem in self.operators: for operator in elem: yield (operator, None)
[docs] def read(self, fh, **kwargs): """ Read the measurements of the correlation observable from fortran. **Arguments** fh : filehandle Read the information about the measurements from this filehandle. """ # First line is separator _ = fh.readline() is_meas = fh.readline().replace("\n", "").replace(" ", "") is_measured = is_meas == "T" for elem in self.name: if not is_measured: yield elem, None continue str_values_r = fh.readline() str_values_c = fh.readline() real_val = np.array(list(map(float, str_values_r.split()))) compl_val = np.array(list(map(float, str_values_c.split()))) vect = real_val + 1j * compl_val dim = real_val.shape[0] # Build numpy matrix res = np.zeros((dim, dim, dim, dim), dtype=vect.dtype) res[:, 0, 0, 0] = vect first_line = True # Now we now the number of entries after reading the # first line and (dim - 1) lines are left for j4 in range(dim): for j3 in range(dim): for j2 in range(dim): if first_line: # We have already read this line first_line = False continue str_values_r = fh.readline() str_values_c = fh.readline() real_val = np.array(list(map(float, str_values_r.split()))) compl_val = np.array(list(map(float, str_values_c.split()))) res[:, j2, j3, j4] = real_val + 1j * compl_val yield elem, res
[docs] def read_hdf5(self, dataset, params): """ Documentation see :func:`_TNObsBase.read_hdf5`. """ raise QTeaLeavesError("This observables cannot read HDF5 files yet.")
[docs] def write_results(self, fh, state_ansatz, **kwargs): """ See :func:`_TNObsBase.write_results`. """ is_measured = self.check_measurable(state_ansatz) # Write separator first fh.write("-" * 20 + "tnobscorr4\n") # Assignment for the linter _ = fh.write("T \n") if is_measured else fh.write("F \n") if is_measured: for name_ii in self.name: # Write column by column because this is favored # by the fortran memory corr_mat = self.results_buffer[name_ii] real_mat = np.real(corr_mat) imag_mat = np.imag(corr_mat) dim = real_mat.shape[1] for j4 in range(dim): for j3 in range(dim): for j2 in range(dim): map_r = map(str, list(real_mat[:, j2, j3, j4])) map_c = map(str, list(imag_mat[:, j2, j3, j4])) str_r = " ".join(list(map_r)) str_c = " ".join(list(map_c)) fh.write(str_r + "\n") fh.write(str_c + "\n") self.results_buffer = {}